package com.shujia.base;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/*
    jdbc连接数据库6步：
        1、注册驱动
        2、创建数据库连接对象
        3、创建数据库操作对象
        4、对数据库进行操作
        5、解析查询结果
        6、关闭释放资源

    Hbase api中基础我们要做的任务：
        1、如何创建一张表
        2、如何删除一张表
        3、如何向hbase表中插入一列数据
        4、如何向hbase表中插入一批数据
        5、如何获取一列数据
        6、如何获取批量列数据
        7、如何创建预分region表
 */
public class HbaseAPI {
    private Connection conn;
    private Admin admin;

    @Before
    public void init() {
        try {
            //获取集群环境配置对象
            //老版本的获取配置文件对象的做法
            //Deprecated
            //since 0.90.0. Please use create(Configuration) instead.
//        HBaseConfiguration conf = new HBaseConfiguration();
            //新版本做法
            Configuration conf = HBaseConfiguration.create();
            //配置zookeeper集群信息
            conf.set("hbase.zookeeper.quorum", "master,node1,node2");

            //获取数据库连接对象
            conn = ConnectionFactory.createConnection(conf);
            System.out.println("hbase数据库连接对象：" + conn);

            //获取数据库操作对象
            admin = conn.getAdmin();
            System.out.println("hbase数据库操作对象：" + admin);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    /**
     * 1、如何创建一张表
     * 建表至少指定表名和列簇名
     * create 'students','info'
     */
    @Test
    public void createOneTable() {
        try {
            TableName tn = TableName.valueOf("students2");
            //应该在创建表之前先判断表是否存在
            boolean b = admin.tableExists(tn);
            if (!b) {
                //老版本创建表描述器对象
                //public HTableDescriptor(final TableName name)
//            HTableDescriptor hTableDescriptor = new HTableDescriptor(tn);
                //hbase2.0.0版本之后推荐使用新的方式创建表描述器对象
                //Deprecated
                //As of release 2.0.0, this will be removed in HBase 3.0.0.
                //Use TableDescriptorBuilder to build HTableDescriptor.
                TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tn);

                //老版本创建列簇描述器并加入到表描述器中的做法
                //创建列簇描述器
                //Deprecated
                //As of release 2.0.0, this will be removed in HBase 3.0.0 (HBASE-18433  ).
                //Use ColumnFamilyDescriptorBuilder.of(String)
//            HColumnDescriptor info = new HColumnDescriptor("info");
//            //将列簇加入到表中
//            tableDescriptorBuilder.addColumnFamily(info);

                //新版本创建列簇描述器并加入到表描述器中的做法
                ColumnFamilyDescriptor info = ColumnFamilyDescriptorBuilder.of("info");
                //设置多个列簇
//            tableDescriptorBuilder.setColumnFamilies()
                //设置一个列簇
                tableDescriptorBuilder.setColumnFamily(info);

                //void createTable(TableDescriptor desc)
                admin.createTable(tableDescriptorBuilder.build());
            } else {
                System.out.println(Bytes.toString(tn.getName()) + "表已经存在！！");
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 2、如何删除一张表
     * 先禁用 再删除
     */
    @Test
    public void dropOneTable() {
        try {
            //先将表名封装成TableName对象
            TableName tableName = TableName.valueOf("students");

            //先判断是否存在
            boolean b = admin.tableExists(tableName);
            if (b) {
                admin.disableTable(tableName);

                admin.deleteTable(tableName);
            } else {
                System.out.println(Bytes.toString(tableName.getName()) + "表不存在！");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 3、如何向hbase表中插入一列数据
     * 1500100001,施笑槐,22,女,文科六班
     * put 'students','1500100001','info:name','施笑槐'
     * put 'students','1500100001','info:age','22'
     * put 'students','1500100001','info:gender','女'
     * put 'students','1500100001','info:clazz','文科六班'
     * <p>
     * <p>
     * 命令中的put: 代表一列
     * api中的Put：代表一行，可以设置多列
     */
    @Test
    public void putOneData() {
        try {
            //先将表名封装成TableName对象
            TableName tableName = TableName.valueOf("students");

            //先判断是否存在
            boolean b = admin.tableExists(tableName);
            if (b) {
                //获取表对象
                Table students = conn.getTable(tableName);

                //将数据封装成Put对象
                //Put(byte [] row)
                Put put = new Put(Bytes.toBytes("1500101001"));

                //添加一列数据的方式1：
                //public Put addColumn(byte [] family, byte [] qualifier, byte [] value)
//                put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes("李刚"));
//                put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes("18"));
//                put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("gender"),Bytes.toBytes("女"));
//                put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("clazz"),Bytes.toBytes("文科一班"));

                //添加一列数据的方式2：将每一列封装成Cell对象
                //public Put add(Cell cell)
                //KeyValue是Cell接口的实现类
                //public KeyValue(final byte [] row, final byte [] family,final byte [] qualifier, final byte [] value)
                put.add(new KeyValue(Bytes.toBytes("1500101001"), Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("李刚")));
                put.add(new KeyValue(Bytes.toBytes("1500101001"), Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("18")));
                put.add(new KeyValue(Bytes.toBytes("1500101001"), Bytes.toBytes("info"), Bytes.toBytes("gender"), Bytes.toBytes("女")));
                put.add(new KeyValue(Bytes.toBytes("1500101001"), Bytes.toBytes("info"), Bytes.toBytes("clazz"), Bytes.toBytes("文科一班")));

                //void put(Put put) 添加一行数据
                //void put(List<Put> puts) 添加多行数据
                students.put(put);
            } else {
                System.out.println(Bytes.toString(tableName.getName()) + "表不存在！");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 4、如何向hbase表中插入一批数据
     */
    @Test
    public void putMoreData() {
        try {
            //先将表名封装成TableName对象
            TableName tableName = TableName.valueOf("students2");

            //先判断是否存在
            boolean b = admin.tableExists(tableName);
            if (b) {
                //获取表对象
                Table students = conn.getTable(tableName);
                //创建一个Put对象的List集合
                ArrayList<Put> putList = new ArrayList<>();

                //读取本地文件
                BufferedReader br = new BufferedReader(new FileReader("data/student.txt"));
                String line = null;
                Put put;
                while ((line = br.readLine()) != null) {
                    String[] infos = line.split(",");
                    put = new Put(Bytes.toBytes(infos[0]));
                    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(infos[1]));
                    put.addColumn(Bytes.toBytes("inter"), Bytes.toBytes("age"), Bytes.toBytes(infos[2]));
                    put.addColumn(Bytes.toBytes("hadoop"), Bytes.toBytes("gender"), Bytes.toBytes(infos[3]));
                    put.addColumn(Bytes.toBytes("oppo"), Bytes.toBytes("clazz"), Bytes.toBytes(infos[4]));

                    //将put对象添加到集合中
                    putList.add(put);
                }
                //void put(List<Put> puts) 添加多行数据
                students.put(putList);

                br.close();

            } else {
                System.out.println(Bytes.toString(tableName.getName()) + "表不存在！");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    /**
     * 5、如何获取一行数据
     * 命令里get：获取一列或者一行
     * 代码里的Get：获取一行
     */
    @Test
    public void getOneData() {
        try {
            //先将表名封装成TableName对象
            TableName tableName = TableName.valueOf("students");

            //先判断是否存在
            boolean b = admin.tableExists(tableName);
            if (b) {
                //获取表对象
                Table students = conn.getTable(tableName);
                Get get = new Get(Bytes.toBytes("1500101001"));
                //Result get(Get get) 获取一行数据
                Result result = students.get(get); // 这里是根据行键获取的一行数据，若要得到每一列，就需要解析result
                //获取列的方式1：getValue(列簇, 列名)
//                String id = Bytes.toString(result.getRow()); // 获取行键，使用Bytes工具转字符串
//                String name = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")));
//                String age = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")));
//                String gender = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("gender")));
//                String clazz = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("clazz")));

                //获取列的方式2：listCells() 列出一行中所有的单元格
                List<Cell> cells = result.listCells();
                String id = Bytes.toString(result.getRow());
                StringBuilder sb = new StringBuilder();
                sb.append("学号：").append(id).append(", ");
                for (Cell cell : cells) {
                    //使用CellUtil工具类获取一个单元格中的内容
//                    String id = Bytes.toString(CellUtil.cloneRow(cell)); //获取一个单元格中的行键
                    String colName = Bytes.toString(CellUtil.cloneQualifier(cell));
                    String colValue = Bytes.toString(CellUtil.cloneValue(cell));
//                    System.out.println("列名：" + colName + ", 列值：" + colValue);
                    sb.append(colName).append("：").append(colValue).append(", ");
                }
                String str = sb.toString();
                str = str.substring(0, str.length() - 2);
                System.out.println(str);

                //TODO:自己实现传入多个Get获取多行数据
                //Result[] get(List<Get> gets) 获取多条数据

            } else {
                System.out.println(Bytes.toString(tableName.getName()) + "表不存在！");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 6、如何获取批量列数据
     * scan 'students'
     */
    @Test
    public void scanData() {
        try {
            //先将表名封装成TableName对象
            TableName tableName = TableName.valueOf("students");

            //先判断是否存在
            boolean b = admin.tableExists(tableName);
            if (b) {
                //获取表对象
                Table students = conn.getTable(tableName);

                Scan scan = new Scan(); //若使用无参构造方法创建Scan对象，表示全表扫描
                //老版本写法
//                scan.setStartRow(Bytes.toBytes("1500100837")); //设置开始行键
//                scan.withStartRow(Bytes.toBytes("1500100837"));
//                scan.withStopRow(Bytes.toBytes("1500100848")); //设置结束行键

                scan.setLimit(10); //取前几条


                //ResultScanner getScanner(Scan scan)
                ResultScanner rs = students.getScanner(scan); // rs中存储了多行数据
                Iterator<Result> resultIterator = rs.iterator(); //获取所有行数据组成的迭代器
                while (resultIterator.hasNext()) { //判断下一行是否有数据
                    Result result = resultIterator.next(); //获取一行数据
                    //获取列的方式1：getValue(列簇, 列名)
//                    String id = Bytes.toString(result.getRow()); // 获取行键，使用Bytes工具转字符串
//                    String name = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")));
//                    String age = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")));
//                    String gender = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("gender")));
//                    String clazz = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("clazz")));
//                    System.out.println("学号：" + id + ", 姓名：" + name + ", 年龄：" + age + ", 性别：" + gender + ", 班级" + clazz);

                    //获取列的方式2：listCells() 列出一行中所有的单元格
                    List<Cell> cells = result.listCells();
                    String id = Bytes.toString(result.getRow());
                    StringBuilder sb = new StringBuilder();
                    sb.append("学号：").append(id).append(", ");
                    for (Cell cell : cells) {
                        //使用CellUtil工具类获取一个单元格中的内容
//                    String id = Bytes.toString(CellUtil.cloneRow(cell)); //获取一个单元格中的行键
                        String colName = Bytes.toString(CellUtil.cloneQualifier(cell));
                        String colValue = Bytes.toString(CellUtil.cloneValue(cell));
//                    System.out.println("列名：" + colName + ", 列值：" + colValue);
                        sb.append(colName).append("：").append(colValue).append(", ");
                    }
                    String str = sb.toString();
                    str = str.substring(0, str.length() - 2);
                    System.out.println(str);
                }


            } else {
                System.out.println(Bytes.toString(tableName.getName()) + "表不存在！");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     *  7、如何创建预分region表【解决热点问题的一个方案】
     *  因为hbase将来可能会发生热点数据的问题，某一个RegionServer接收的处理请求太多了，与其它的RegionServer工作量产生明显的区别
     *  就会导致该RegionServer挂掉，我们可以采取将部分请求分散到其它的RegionServer上进行处理
     *
     *  分散的依据：
     *      1、hbase表中的数据，行键是根据字典顺序排序的，这就决定了我们添加数据的时候，根据行键的顺序，找到对应的region做添加
     *      2、若我们将一张表根据行键切分出多个region，由hmaster自动会对多个region进行负载均衡，重新分配
     *      3、结合第一点，将来我们的请求就会根据行键被分散到其它的regionserver上了
     *
     *      e  h  l   o  t
     */
    @Test
    public void createPreviewTable(){
        try {
            //先将表名封装成TableName对象
            TableName tableName = TableName.valueOf("split_t");

            //创建表描述器对象
            TableDescriptorBuilder splitT = TableDescriptorBuilder.newBuilder(tableName);

            //创建列簇描述器对象
//            HColumnDescriptor info = new HColumnDescriptor("info");
            ColumnFamilyDescriptor info = ColumnFamilyDescriptorBuilder.of("info");

            //将列簇加入到表中
            splitT.setColumnFamily(info);

            //e  h  l   o  t
            byte[][] splits = {
                    Bytes.toBytes("e"),
                    Bytes.toBytes("h"),
                    Bytes.toBytes("l"),
                    Bytes.toBytes("o"),
                    Bytes.toBytes("t")
            };


            //创建普通hbase表
//            admin.createTable(splitT.build()); // 默认一开始只会提供一个region,只会被一个regionserver所管理
            //创建预分region表
            //createTable(TableDescriptor desc, byte[][] splitKeys)
            admin.createTable(splitT.build(), splits); // 切分出的region由master负载均衡，会有多个regionserver所管理

        }catch (Exception e){
            e.printStackTrace();
        }

    }



    //释放资源
    @After
    public void closeZiYuan() {
        if (admin != null) {
            try {
                admin.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        if (conn != null) {
            try {
                conn.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }


}
